Skip to content

[SPARK-56324] Introducing message-based communication to Spark -> PySpark communication channel#55716

Open
sven-weber-db wants to merge 1 commit into
apache:masterfrom
sven-weber-db:sven-weber_data/spark-56324
Open

[SPARK-56324] Introducing message-based communication to Spark -> PySpark communication channel#55716
sven-weber-db wants to merge 1 commit into
apache:masterfrom
sven-weber-db:sven-weber_data/spark-56324

Conversation

@sven-weber-db
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db commented May 6, 2026

What changes were proposed in this pull request?

This is the second in a series of PRs that introduce message-based communication to PySpark UDFs. This initiative is part of SPIP SPARK-55278, which proposes language-agnostic UDFs. This PR builds on top of the changes from PR #55515.

The goal of introducing message-based communication to PySpark is to:

  1. Make the communication between Spark <-> PySpark more structured.
  2. Enable new communication protocols (e.g., gRPC) transparently.

The overall goal is to introduce a second communication channel while keeping the existing channel intact. Specifically, we want to introduce gRPC in addition to Unix Domain Sockets (UDS). The existing UDS channel will not be changed, and its characteristics, including performance, will remain untouched.

This PR specifically propose the following changes:

  1. PythonRunner.scala - Add a new message header and a length field to the initialization data/message send from Spark to PySpark. This change is required to distinguish the initial message from other, later, messages. It constitutes the only required change in the Spark -> PySpark wire protocol.
  2. Add new abstractions to read Spark -> PySpark messages from the existing socket channel - including the new init message
  3. Change worker.py to use the new socket message reader to process the UDF request

With these changes, a new message reader can be implemented and transparently use for other transport channels (e.g. gRPC).

Why are the changes needed?

The changes introduced here make PySpark transport layer agnostic for the Spark -> PySpark channel. This is required for PySpark to support the new, language agnostic UDF protocol proposed in SPIP SPARK-55278. Follow-up PRs will address the PySpark -> Spark communication in a similar manner.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing test suites:

PySpark

pyspark.tests.test_worker
pyspark.sql.tests.test_udf
pyspark.sql.tests.test_udtf
pyspark.sql.tests.pandas.test_pandas_udf_scalar
pyspark.sql.tests.arrow.test_arrow_udf_scalar
pyspark.sql.tests.arrow.test_arrow_udf
pyspark.sql.tests.arrow.test_arrow_grouped_map
pyspark.sql.tests.arrow.test_arrow_cogrouped_map
pyspark.tests.test_taskcontext
pyspark.sql.tests.test_python_datasource

Spark

org.apache.spark.sql.execution.python.PythonUDFSuite
org.apache.spark.sql.execution.python.PythonUDTFSuite
org.apache.spark.sql.execution.python.ArrowColumnarPythonUDFSuite
org.apache.spark.sql.execution.python.BatchEvalPythonExecSuite
org.apache.spark.sql.execution.python.PythonDataSourceSuite
org.apache.spark.sql.execution.python.PythonWorkerLogsSuite

Was this patch authored or co-authored using generative AI tooling?

No

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/spark-56324 branch from c0943bb to d22ee1a Compare May 7, 2026 12:22
return None
s = stream.read(length)
return s.decode("utf-8") if self.use_unicode else s
return codecs.decode(s, "utf-8") if self.use_unicode else s
Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required because we use the ZeroCopyByteStream class now to read the initial message. ZeroCopyByteStream returns memoryview objects instead of bytes or bytearray. s.decode is only implemented for s{bytes, bytearray}.

According to the Python documentation, for bytes.decode and codecs.decode the later implementation may behave different in case of errors.

bytes.decode states that

errors controls how decoding errors are handled. If 'strict' (the default), a UnicodeError exception is raised.

codecs.decode states that

The default error handler is 'strict' meaning that decoding errors raise ValueError (or a more codec specific subclass, such as UnicodeDecodeError)

As decoding errors are unexpected and the specific Exception type that is thrown should not matter, I believe this change is safe. However, if there are concerns we can change this implementation to the following:

return bytes(s).decode("utf-8") if self.use_unicode else s

This alternative implementation will invoke a memory copy to copy the memoryview contents into a bytes object before decoding. Given that this call is only invoked on deserialization of the initial message, this memory copy is probably acceptable as it is a one time cost.

Comment thread python/pyspark/worker.py

@with_faulthandler
def main(infile, outfile):
def invoke_udf(message_receiver: SparkMessageReceiver, outfile: BinaryIO):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to rename this function if another name is prefered

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine for now

@sven-weber-db sven-weber-db marked this pull request as ready for review May 7, 2026 12:45
@sven-weber-db sven-weber-db force-pushed the sven-weber_data/spark-56324 branch 4 times, most recently from 7f25a61 to 70c9aba Compare May 8, 2026 13:44
@sven-weber-db sven-weber-db force-pushed the sven-weber_data/spark-56324 branch from 70c9aba to 25b19a6 Compare May 8, 2026 16:55

@classmethod
def _setTaskContext(cls: Type["TaskContext"], taskContext: "TaskContext") -> None:
def _setTaskContext(cls: Type["TaskContext"], taskContext: Optional["TaskContext"]) -> None:
Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required to make mypy happy due to the following call in worker.py:

# Cleanup
# Reset task context to None. This is a guard code to avoid residual context when worker
# reuse.
TaskContext._setTaskContext(None)
BarrierTaskContext._setTaskContext(None)

It is unclear to my why this type check if failing only now since this code has been in worker.py for a long time.

Copy link
Copy Markdown
Contributor

@Yicong-Huang Yicong-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Left inline comments. I think we need some unit tests + benchmark proof for the change.

Comment thread python/pyspark/worker.py
Comment thread python/pyspark/worker.py
# The type checker needs some help here..
# See the code in WorkerInitInfo.from_stream(infile)
# to see the correct type.
assert isinstance(init_info.udf_info, memoryview)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think adding runtime assertions is a good idea. to make mypy happy, maybe typing.cast(memoryview, init_info.udf_info)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While typing.cast(memoryview, init_info.udf_info) would help mypy, it also means that init_info.udf_info is not guaranteed to be of type memoryview. If someone were to change the code in WorkerInitInfo.from_stream to return a different type for the NON_UDF case, this issue would not be caught and might lead to subtle run-time errors. My rationale for asserting here was to prevent such a situation.

However, I agree that an assert is probably not the solution here. I revisited the problem, and I think I have found a nice solution using a TypeGuard instead. In worker_message.py, the following TypeGuard has been added:

def is_non_udf_info(
    udf_info: UdfInfoType,
    eval_type: int,
) -> TypeGuard[memoryview]:
    """TypeGuard that narrows udf_info to memoryview when eval_type is NON_UDF."""
    return eval_type == PythonEvalType.NON_UDF

This can be used in worker.py instead of the if eval_type == PythonEvalType.NON_UDF:, which resolves the typing error. Additionally, it keeps the typing logic close to the source where udf_info is defined. Let's discuss if you prefer a different solution!

Comment thread python/pyspark/messages/spark_message_receiver.py
Comment thread python/pyspark/messages/socket/spark_socket_message_receiver.py
Comment thread python/pyspark/worker.py
@with_faulthandler
def main(infile, outfile):
# Instantiate socket message readers for executing the UDF
socket_reader = SparkSocketMessageReceiver(infile)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still prefer we have a flag and by default use the original code path. replacing it directly would need some benchmark proof.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants